diff --git a/scripts/combine_google_pdp_logs.sh b/scripts/combine_google_pdp_logs.sh new file mode 100644 index 0000000..e080cee --- /dev/null +++ b/scripts/combine_google_pdp_logs.sh @@ -0,0 +1,219 @@ +#!/usr/bin/env bash + +# +# Copyright 2020-2024, Institute for Systems Biology +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pull_a_set() { + FUNC_LOG_LOC=${1} + FUNC_FILE_CORE=${2} + FUNC_HEADER_FIELD=${3} + FUNC_MAX_FILES_PER_RUN=${4} + FUNC_FILE_LIST=${5} + FUNC_FLOW_NAME=${6} + FUNC_COMBINED_PRESORT=${7} + FUNC_COMBINED=${8} + FUNC_LOG_LOC_PRE=${9} + FUNC_LOG_LOC_POST=${10} + FUNC_BQ_BUCK=${11} + FUNC_BQ_TABLE=${12} + FUNC_SCHEMA_FILE=${13} + + # + # Generate lists of the records files in the bucket for this particular statistic: + # + + gsutil ls -R ${FUNC_LOG_LOC}/* 2>/dev/null | grep "${FUNC_FILE_CORE}" | head -n ${FUNC_MAX_FILES_PER_RUN} > ${FUNC_FILE_LIST} || true + + # + # Get the column header line into the top of the file: + # + + if [ -s "${FUNC_FILE_LIST}" ]; then + ONE_FILE=`head -n 1 ${FUNC_FILE_LIST}` + gsutil cp ${ONE_FILE} ${FUNC_FILE_CORE}_header_file_${FUNC_FLOW_NAME}.txt + head -n 1 ${FUNC_FILE_CORE}_header_file_${FUNC_FLOW_NAME}.txt > ${FUNC_COMBINED_PRESORT} + rm ${FUNC_FILE_CORE}_header_file_${FUNC_FLOW_NAME}.txt + + # + # Maybe get a backup date to assign a zero if we wind up with no data? + # + echo ${ONE_FILE} + + # + # Haul in the files: + # + + TOTAL=`cat ${FUNC_FILE_LIST} | wc -l` + N=0 + while IFS= read -r LINE + do + gsutil cat ${LINE} | grep -v ${FUNC_HEADER_FIELD} > ${FUNC_FILE_CORE}_tmp_${FUNC_FLOW_NAME}.csv + while IFS= read -r TLINE; do + LINE_TIME=`echo ${TLINE} | cut -d, -f1 | sed -e 's/"//g'` + LINE_CDR=`echo ${TLINE} | cut -d, -f2-` + LINE_STAMP=`date -d"${LINE_TIME}" +"%Y-%m-%d %H:%M:%S"` + echo "\"${LINE_STAMP}\",${LINE_CDR}" >> ${FUNC_COMBINED_PRESORT} + done < ${FUNC_FILE_CORE}_tmp_${FUNC_FLOW_NAME}.csv + rm -f ${FUNC_FILE_CORE}_tmp_${FUNC_FLOW_NAME}.csv + #message: Could not parse '12/04/2024 13:56:00' as a timestamp. + #Required format is YYYY-MM-DD HH:MM[:SS[.SSSSSS]] or YYYY/MM/DD + + echo "${LINE}" + TARG_LINE=`echo "${LINE}" | sed -e "s/${FUNC_LOG_LOC_PRE}/${FUNC_LOG_LOC_POST}/"` + echo "${TARG_LINE}" + gsutil mv ${LINE} ${TARG_LINE} + N=$((N+1)) + echo "Finished ${N} of ${TOTAL}" + done < ${FUNC_FILE_LIST} + + # + # For laughs, let's sort and uniq the combined list + # + + FILE_HEAD=`head -n 1 ${FUNC_COMBINED_PRESORT}` + echo ${FILE_HEAD} > ${FUNC_COMBINED} + grep -v "${FILE_HEAD}" ${FUNC_COMBINED_PRESORT} | sort | uniq >> ${FUNC_COMBINED} + + # + # Up to the bucket and into BQ! + # + + gsutil cp ${FUNC_COMBINED} ${FUNC_BQ_BUCK} + LINECOUNT=`cat ${FUNC_COMBINED} | wc -l` + if [ ${LINECOUNT} -eq 1 ]; then + echo "There are no records so skipping BQ load" + else + echo ${FUNC_LOG_LOC} + echo ${FUNC_FILE_CORE} + echo ${FUNC_HEADER_FIELD} + echo ${FUNC_MAX_FILES_PER_RUN} + echo ${FUNC_FILE_LIST} + echo ${FUNC_FLOW_NAME} + echo ${FUNC_COMBINED_PRESORT} + echo ${FUNC_COMBINED} + echo ${FUNC_LOG_LOC_PRE} + echo ${FUNC_LOG_LOC_POST} + echo ${FUNC_BQ_BUCK} + echo ${FUNC_BQ_TABLE} + echo ${FUNC_SCHEMA_FILE} + bq load --source_format=CSV --field_delimiter=',' --skip_leading_rows=1 "${FUNC_BQ_TABLE}" "${FUNC_BQ_BUCK}" "${FUNC_SCHEMA_FILE}" + fi + rm -f ${FUNC_COMBINED_PRESORT} ${FUNC_COMBINED} + fi + + rm ${FUNC_FILE_LIST} + +} + +if [[ $# -ne 1 ]]; then + echo "Usage: pdp_logs_to_bq.sh [flow-name]" + exit +fi + +FLOW_NAME=$1 + +source ${HOME}/scripts/setPDPEnvCombine-${FLOW_NAME}.sh + +cd ${HOME}/pdpProcess +TODAY=`date +%Y-%m-%d-%H-%M` +FILE_LIST_SENT_BYTE=curr-byte-${FLOW_NAME}-${TODAY}.txt +FILE_LIST_LIST_OBJ=curr-list-${FLOW_NAME}-${TODAY}.txt +FILE_LIST_REQ_OBJ=curr-request-${FLOW_NAME}-${TODAY}.txt +COMBINED_BYTE_PRESORT=combined-byte-presort-${FLOW_NAME}-${TODAY}.csv +COMBINED_LIST_PRESORT=combined-list-presort-${FLOW_NAME}-${TODAY}.csv +COMBINED_REQ_PRESORT=combined-request-presort-${FLOW_NAME}-${TODAY}.csv +COMBINED_BYTE=combined-byte-${FLOW_NAME}-${TODAY}.csv +COMBINED_LIST=combined-list-${FLOW_NAME}-${TODAY}.csv +COMBINED_REQ=combined-request-${FLOW_NAME}-${TODAY}.csv +BYTE_HEADER_FIELD=read_object_sent_bytes_count +LIST_HEADER_FIELD=list_objects_request_count +READ_HEADER_FIELD=read_object_request_count +LIST_FILE_CORE=list_objects_request_count +READ_OBJECT_FILE_CORE=read_object_request_count +READ_BYTE_FILE_CORE=read_object_sent_byte_count +BYTE_BQ_BUCK=${BQ_BUCK}/${COMBINED_BYTE} +LIST_BQ_BUCK=${BQ_BUCK}/${COMBINED_LIST} +READ_BQ_BUCK=${BQ_BUCK}/${COMBINED_REQ} +LIST_SCHEMA_FILE="myListSchema-${FLOW_NAME}-${TODAY}.json" +BYTE_SCHEMA_FILE="myByteSchema-${FLOW_NAME}-${TODAY}.json" +READ_SCHEMA_FILE="myReqSchema-${FLOW_NAME}-${TODAY}.json" + +cat << EOF1 > ${BYTE_SCHEMA_FILE} +[ + { + "name": "timestamp", + "type": "TIMESTAMP", + "mode": "REQUIRED" + }, + { + "name": "${BYTE_HEADER_FIELD}", + "type": "INTEGER", + "mode": "REQUIRED" + } +] +EOF1 + +pull_a_set ${LOG_LOC} ${READ_BYTE_FILE_CORE} ${BYTE_HEADER_FIELD} ${MAX_FILES_PER_RUN} ${FILE_LIST_SENT_BYTE} \ + ${FLOW_NAME} ${COMBINED_BYTE_PRESORT} ${COMBINED_BYTE} ${LOG_LOC_PRE} ${LOG_LOC_POST} \ + ${BYTE_BQ_BUCK} ${BYTE_BQ_TABLE} ${BYTE_SCHEMA_FILE} + +rm ${BYTE_SCHEMA_FILE} + +cat << EOF2 > ${LIST_SCHEMA_FILE} +[ + { + "name": "timestamp", + "type": "TIMESTAMP", + "mode": "REQUIRED" + }, + { + "name": "${LIST_HEADER_FIELD}", + "type": "INTEGER", + "mode": "REQUIRED" + } +] +EOF2 + +pull_a_set ${LOG_LOC} ${LIST_FILE_CORE} ${LIST_HEADER_FIELD} ${MAX_FILES_PER_RUN} ${FILE_LIST_LIST_OBJ} \ + ${FLOW_NAME} ${COMBINED_LIST_PRESORT} ${COMBINED_LIST} ${LOG_LOC_PRE} ${LOG_LOC_POST} \ + ${LIST_BQ_BUCK} ${LIST_BQ_TABLE} ${LIST_SCHEMA_FILE} + +rm ${LIST_SCHEMA_FILE} + +cat << EOF3 > ${READ_SCHEMA_FILE} +[ + { + "name": "timestamp", + "type": "TIMESTAMP", + "mode": "REQUIRED" + }, + { + "name": "${READ_HEADER_FIELD}", + "type": "INTEGER", + "mode": "REQUIRED" + } +] +EOF3 + + +pull_a_set ${LOG_LOC} ${READ_OBJECT_FILE_CORE} ${READ_HEADER_FIELD} ${MAX_FILES_PER_RUN} ${FILE_LIST_REQ_OBJ} \ + ${FLOW_NAME} ${COMBINED_REQ_PRESORT} ${COMBINED_REQ} ${LOG_LOC_PRE} ${LOG_LOC_POST} \ + ${READ_BQ_BUCK} ${READ_BQ_TABLE} ${READ_SCHEMA_FILE} + +rm ${READ_SCHEMA_FILE} + + + diff --git a/scripts/pull_monitoring.py b/scripts/pull_monitoring.py new file mode 100644 index 0000000..35dd43e --- /dev/null +++ b/scripts/pull_monitoring.py @@ -0,0 +1,72 @@ +import functions_framework +from google.cloud import storage +from google.cloud import monitoring_v3 +import datetime +import os + +BUCKET_NAME = os.environ['BUCKET_NAME'] +LOGGING_BUCKET_NAME = os.environ['LOGGING_BUCKET_NAME'] +PROJECT_ID = os.environ['PROJECT_ID'] + +def csv_to_bucket(storage_client, logging_bucket_name, file_name, csv): + bucket = storage_client.bucket(logging_bucket_name) + blob = bucket.blob(file_name) + blob.upload_from_string(csv, content_type='text/csv') + return + +def generate_csv(project_id, monitoring_client, filter, interval, metric_name): + results = monitoring_client.list_time_series( + request={ + "name": f"projects/{project_id}", + "filter": filter, + "interval": interval, + "view": monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, + } + ) + + csv = '{0},{1}\n'.format('timestamp', metric_name) + for result in results: + for point in result.points: + data_point = point.interval.start_time.strftime("%m/%d/%Y %H:%M:%S") + csv += '{0},{1}\n'.format(data_point, point.value.int64_value) + + return csv + + +@functions_framework.http +def collect_stats(request): + monitoring_client = monitoring_v3.MetricServiceClient() + storage_client = storage.Client() + # + # I believe the interval is of range (startTime, endTime] (start time not included) + # Get the stats for the last hour :01-:00 + # + last_hour = datetime.datetime.utcnow().strftime("%d/%m/%Y %H:00:00") + top_hour_stamp = int(datetime.datetime.strptime(last_hour, "%d/%m/%Y %H:%M:%S").timestamp()) + last_top_hour_stamp = top_hour_stamp - (60 * 60) + interval = monitoring_v3.TimeInterval( + { + "end_time": {"seconds": top_hour_stamp}, + "start_time": {"seconds": last_top_hour_stamp}, + } + ) + + list_objects_request_count_filter = f'metric.type = "storage.googleapis.com/api/request_count" AND resource.type = "gcs_bucket" AND resource.label.bucket_name = "{BUCKET_NAME}" AND metric.label.method = "ListObjects" metric.label.response_code = "OK"' + read_object_request_count_filter = f'metric.type = "storage.googleapis.com/api/request_count" AND resource.type = "gcs_bucket" AND resource.label.bucket_name = "{BUCKET_NAME}" AND metric.label.method = "ReadObject" metric.label.response_code = "OK"' + read_object_sent_bytes_count_filter = f'metric.type = "storage.googleapis.com/network/sent_bytes_count" AND resource.type = "gcs_bucket" AND resource.label.bucket_name = "{BUCKET_NAME}" AND metric.label.method = "ReadObject" metric.label.response_code = "OK"' + + list_objects_request_count_csv = generate_csv(PROJECT_ID, monitoring_client, list_objects_request_count_filter, interval, "list_objects_request_count") + read_object_request_count_csv = generate_csv(PROJECT_ID, monitoring_client, read_object_request_count_filter, interval, "read_object_request_count") + read_object_sent_byte_count_csv = generate_csv(PROJECT_ID, monitoring_client, read_object_sent_bytes_count_filter, interval, "read_object_sent_bytes_count") + + acquire_time = datetime.datetime.utcnow() + acquire_date = acquire_time.strftime("%m-%d-%Y") + acquire_hms = acquire_time.strftime("%H:%M:%S") + + # E.g. idc-legacy-data-usage-logs/public-datasets-idc/12-04-2024/list_objects_request_count_14:00:07 + csv_to_bucket(storage_client, LOGGING_BUCKET_NAME, f'{BUCKET_NAME}/{acquire_date}/list_objects_request_count_{acquire_hms}', list_objects_request_count_csv) + csv_to_bucket(storage_client, LOGGING_BUCKET_NAME, f'{BUCKET_NAME}/{acquire_date}/read_object_request_count_{acquire_hms}', read_object_request_count_csv) + csv_to_bucket(storage_client, LOGGING_BUCKET_NAME, f'{BUCKET_NAME}/{acquire_date}/read_object_sent_byte_count_{acquire_hms}', read_object_sent_byte_count_csv) + + return "Successful\n" + diff --git a/scripts/pull_monitoring_requirements.txt b/scripts/pull_monitoring_requirements.txt new file mode 100644 index 0000000..8bb828d --- /dev/null +++ b/scripts/pull_monitoring_requirements.txt @@ -0,0 +1,3 @@ +functions-framework==3.* +google.cloud.storage +google.cloud.monitoring