diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 0000000..86df08c --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,74 @@ +name: Build and Release + +on: + push: + branches: [ main ] + tags: [ 'v*' ] + pull_request: + branches: [ main ] + +jobs: + build: + name: Build + runs-on: ubuntu-latest + strategy: + matrix: + goos: [linux, darwin, windows] + goarch: [amd64, arm64] + exclude: + # Exclude Windows on ARM64 as it's less common + - goos: windows + goarch: arm64 + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: '1.21' + check-latest: true + + - name: Build binary + env: + GOOS: ${{ matrix.goos }} + GOARCH: ${{ matrix.goarch }} + run: | + # Set the output binary name with extension based on OS + EXT="" + if [ "${{ matrix.goos }}" = "windows" ]; then + EXT=".exe" + fi + + # Build the binary + go build -v -o "s3usage-${{ matrix.goos }}-${{ matrix.goarch }}${EXT}" ./cmd/s3usage + + - name: Upload build artifact + uses: actions/upload-artifact@v3 + with: + name: s3usage-${{ matrix.goos }}-${{ matrix.goarch }} + path: s3usage-${{ matrix.goos }}-${{ matrix.goarch }}* + if-no-files-found: error + + release: + name: Create Release + needs: build + if: startsWith(github.ref, 'refs/tags/') + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Download artifacts + uses: actions/download-artifact@v3 + with: + path: ./artifacts + + - name: Create release + id: create_release + uses: softprops/action-gh-release@v1 + with: + files: ./artifacts/**/* + draft: false + prerelease: false + generate_release_notes: true \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..567609b --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +build/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..0c00334 --- /dev/null +++ b/README.md @@ -0,0 +1,146 @@ +# S3Usage - S3 Bucket Usage Monitor for Ceph + +S3Usage is a command-line application that monitors storage usage of S3 buckets in a Ceph cluster. It uses Ceph's RGW Admin Ops API to efficiently retrieve bucket statistics, stores the data in a SQLite database, calculates monthly averages, and provides commands to query historical usage information. + +## Features + +- Connect to Ceph S3 RGW Admin API to efficiently retrieve bucket usage statistics +- Store usage data in a SQLite database +- Calculate monthly average bucket usage +- Display monthly usage for all buckets +- Query historical usage for specific buckets +- Prune old data points while preserving monthly statistics + +## Implementation Details + +This tool uses two different APIs: +- Standard S3 API to list the buckets (as a fallback) +- Ceph RGW Admin Ops API to fetch bucket statistics efficiently + +The Admin API requests are properly signed using AWS Signature v4 authentication, which is required by Ceph RGW. Using the Admin Ops API provides significant performance improvements over listing all objects in buckets, making this tool suitable for monitoring Ceph S3 deployments with many large buckets. + +## Authentication Requirements + +The tool uses proper AWS Signature v4 authentication for Admin API requests, identical to the authentication used by the `radosgw-admin` CLI tool. This requires: + +1. A Ceph user with administrative privileges +2. The access and secret keys for that user +3. The correct Ceph RGW endpoint URL + +## Installation + +### From Source + +```bash +git clone https://github.com/thannaske/s3usage.git +cd s3usage +go build -o s3usage ./cmd/s3usage +# Optionally, move to a directory in your PATH +sudo mv s3usage /usr/local/bin/ +``` + +## Usage + +### Environment Variables + +You can configure S3Usage using environment variables: + +- `S3_ENDPOINT`: Ceph S3 endpoint URL (should be the RGW API endpoint) +- `S3_ACCESS_KEY`: S3 access key (requires admin privileges for RGW Admin API) +- `S3_SECRET_KEY`: S3 secret key +- `S3_REGION`: S3 region (default: "default") +- `S3_DB_PATH`: Path to SQLite database (default: `~/.s3usage.db`) + +### Required Permissions + +The access key used must have administrative privileges on the Ceph RGW to access the Admin API endpoints. You can create a user with the appropriate permissions using: + +```bash +radosgw-admin user create --uid=s3usage --display-name="S3 Usage Monitor" --caps="buckets=*;users=*;usage=*;metadata=*;zone=*" +radosgw-admin key create --uid=s3usage --key-type=s3 --gen-access-key --gen-secret +``` + +### Collecting Usage Data + +To collect bucket usage data and store it in the database, use the `collect` command: + +```bash +s3usage collect --endpoint=https://s3.example.com --access-key=YOUR_ACCESS_KEY --secret-key=YOUR_SECRET_KEY +``` + +Or using environment variables: + +```bash +export S3_ENDPOINT=https://s3.example.com +export S3_ACCESS_KEY=YOUR_ACCESS_KEY +export S3_SECRET_KEY=YOUR_SECRET_KEY +s3usage collect +``` + +This command is meant to be scheduled via cron to collect data regularly. + +### Monthly Usage Report + +To display the monthly average usage for all buckets: + +```bash +s3usage list --year=2025 --month=2 +``` + +If no year/month is specified, the previous month's data is shown. + +### Bucket Usage History + +To view historical usage data for a specific bucket: + +```bash +s3usage history my-bucket-name +``` + +This shows a year's worth of historical data for the specified bucket. + +### Pruning Old Data + +To clean up individual data points from months that have already been aggregated into monthly averages: + +```bash +s3usage prune +``` + +This will prompt for confirmation before deleting data. To skip the confirmation: + +```bash +s3usage prune --confirm +``` + +The prune command only removes data points from completed months that already have monthly averages calculated. It preserves: +- All monthly average statistics +- Data points from the current month +- Data points from months without calculated averages + +This helps keep the database size manageable over time without losing valuable statistics. + +## Cron Setup + +To collect data daily, add a cron job: + +```bash +# Edit crontab +crontab -e + +# Add this line to run daily at 23:45 +45 23 * * * /usr/local/bin/s3usage collect --endpoint=https://s3.example.com --access-key=YOUR_ACCESS_KEY --secret-key=YOUR_SECRET_KEY +``` + +## Troubleshooting + +If you encounter authentication issues: + +1. Verify your user has the correct admin capabilities +2. Ensure your endpoint URL is correct (it should point to the RGW API endpoint) +3. Check that you're using the correct access and secret keys +4. Verify the region setting matches your Ceph configuration + +## License + +MIT \ No newline at end of file diff --git a/cmd/s3usage/collect.go b/cmd/s3usage/collect.go new file mode 100644 index 0000000..7f979a9 --- /dev/null +++ b/cmd/s3usage/collect.go @@ -0,0 +1,90 @@ +package main + +import ( + "context" + "fmt" + "time" + + "github.com/spf13/cobra" + "github.com/thannaske/s3usage/pkg/ceph" + "github.com/thannaske/s3usage/pkg/db" +) + +var collectCmd = &cobra.Command{ + Use: "collect", + Short: "Collect bucket usage data", + Long: `Collect usage data for all buckets and store it in the database.`, + Run: func(cmd *cobra.Command, args []string) { + // Validate required parameters + if config.S3Endpoint == "" || config.S3AccessKey == "" || config.S3SecretKey == "" { + fmt.Println("Error: Missing required S3 credentials. Please provide --endpoint, --access-key, and --secret-key.") + return + } + + // Initialize the database + database, err := db.NewDB(config.DBPath) + if err != nil { + fmt.Printf("Error connecting to database: %v\n", err) + return + } + defer database.Close() + + err = database.InitDB() + if err != nil { + fmt.Printf("Error initializing database: %v\n", err) + return + } + + // Initialize the S3 client + s3Client, err := ceph.NewS3Client(config) + if err != nil { + fmt.Printf("Error initializing S3 client: %v\n", err) + return + } + + // Get usage data for all buckets + fmt.Println("Collecting bucket usage data...") + usages, err := s3Client.GetAllBucketsUsage(context.Background()) + if err != nil { + fmt.Printf("Error collecting bucket usage data: %v\n", err) + return + } + + // Store usage data in the database + for _, usage := range usages { + err = database.StoreBucketUsage(usage) + if err != nil { + fmt.Printf("Error storing usage data for bucket %s: %v\n", usage.BucketName, err) + continue + } + fmt.Printf("Stored usage data for bucket %s: %d bytes, %d objects\n", + usage.BucketName, usage.SizeBytes, usage.ObjectCount) + } + + // Check if we need to calculate monthly averages + now := time.Now() + // If it's the end of the month (last day), calculate monthly averages + if now.Day() == getDaysInMonth(now.Year(), int(now.Month())) { + fmt.Println("Calculating monthly averages...") + err = database.CalculateMonthlyAverages(now.Year(), int(now.Month())) + if err != nil { + fmt.Printf("Error calculating monthly averages: %v\n", err) + return + } + fmt.Println("Monthly averages calculated successfully.") + } + + fmt.Println("Collection completed successfully.") + }, +} + +// getDaysInMonth returns the number of days in a month +func getDaysInMonth(year, month int) int { + // Create a date in the month and go to the 0th day of the next month + t := time.Date(year, time.Month(month+1), 0, 0, 0, 0, 0, time.UTC) + return t.Day() +} + +func init() { + rootCmd.AddCommand(collectCmd) +} diff --git a/cmd/s3usage/list.go b/cmd/s3usage/list.go new file mode 100644 index 0000000..6969c32 --- /dev/null +++ b/cmd/s3usage/list.go @@ -0,0 +1,185 @@ +package main + +import ( + "fmt" + "os" + "sort" + "text/tabwriter" + "time" + + "github.com/spf13/cobra" + "github.com/thannaske/s3usage/pkg/db" +) + +var ( + year int + month int +) + +// formatSize converts bytes to a human-readable format +func formatSize(bytes float64) string { + const ( + _ = iota + KB float64 = 1 << (10 * iota) + MB + GB + TB + PB + ) + + unit := "" + value := bytes + + switch { + case bytes >= PB: + unit = "PB" + value = bytes / PB + case bytes >= TB: + unit = "TB" + value = bytes / TB + case bytes >= GB: + unit = "GB" + value = bytes / GB + case bytes >= MB: + unit = "MB" + value = bytes / MB + case bytes >= KB: + unit = "KB" + value = bytes / KB + default: + unit = "bytes" + } + + if unit == "bytes" { + return fmt.Sprintf("%.0f %s", value, unit) + } + return fmt.Sprintf("%.2f %s", value, unit) +} + +var listCmd = &cobra.Command{ + Use: "list", + Short: "List monthly bucket usage", + Long: `Display monthly average usage statistics for all buckets.`, + Run: func(cmd *cobra.Command, args []string) { + // If no year/month specified, use previous month + now := time.Now() + if year == 0 { + if now.Month() == 1 { + year = now.Year() - 1 + month = 12 + } else { + year = now.Year() + month = int(now.Month()) - 1 + } + } + if month == 0 { + month = int(now.Month()) + } + + // Validate month + if month < 1 || month > 12 { + fmt.Println("Error: Month must be between 1 and 12.") + return + } + + // Initialize the database + database, err := db.NewDB(config.DBPath) + if err != nil { + fmt.Printf("Error connecting to database: %v\n", err) + return + } + defer database.Close() + + // Get monthly averages + averages, err := database.GetAllMonthlyAverages(year, month) + if err != nil { + fmt.Printf("Error retrieving monthly averages: %v\n", err) + return + } + + if len(averages) == 0 { + fmt.Printf("No data available for %d-%02d\n", year, month) + return + } + + // Sort by size (largest first) + sort.Slice(averages, func(i, j int) bool { + return averages[i].AvgSizeBytes > averages[j].AvgSizeBytes + }) + + // Print the results + fmt.Printf("Monthly Average Usage for %d-%02d\n\n", year, month) + w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', tabwriter.TabIndent) + fmt.Fprintln(w, "Bucket\tSize\tObjects\tSamples") + fmt.Fprintln(w, "------\t----\t-------\t-------") + + for _, avg := range averages { + fmt.Fprintf(w, "%s\t%s\t%d\t%d\n", + avg.BucketName, + formatSize(avg.AvgSizeBytes), + int(avg.AvgObjectCount), + avg.DataPoints, + ) + } + w.Flush() + }, +} + +var historyCmd = &cobra.Command{ + Use: "history [bucket-name]", + Short: "Show usage history for a bucket", + Long: `Display historical usage data for a specific bucket.`, + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, args []string) { + bucketName := args[0] + + // Initialize the database + database, err := db.NewDB(config.DBPath) + if err != nil { + fmt.Printf("Error connecting to database: %v\n", err) + return + } + defer database.Close() + + // Calculate date range + now := time.Now() + startTime := time.Date(now.Year()-1, now.Month(), 1, 0, 0, 0, 0, time.UTC) + endTime := time.Date(now.Year(), now.Month()+1, 0, 23, 59, 59, 0, time.UTC) + + // Get usage history + usages, err := database.GetBucketUsage(bucketName, startTime, endTime) + if err != nil { + fmt.Printf("Error retrieving usage history: %v\n", err) + return + } + + if len(usages) == 0 { + fmt.Printf("No usage data available for bucket %s\n", bucketName) + return + } + + // Print the results + fmt.Printf("Usage History for Bucket: %s\n\n", bucketName) + w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', tabwriter.TabIndent) + fmt.Fprintln(w, "Date\tSize\tObjects") + fmt.Fprintln(w, "----\t----\t-------") + + for _, usage := range usages { + fmt.Fprintf(w, "%s\t%s\t%d\n", + usage.Timestamp.Format("2006-01-02 15:04:05"), + formatSize(float64(usage.SizeBytes)), + usage.ObjectCount, + ) + } + w.Flush() + }, +} + +func init() { + rootCmd.AddCommand(listCmd) + rootCmd.AddCommand(historyCmd) + + // Add flags to the list command + listCmd.Flags().IntVar(&year, "year", 0, "Year to query (default: current year)") + listCmd.Flags().IntVar(&month, "month", 0, "Month to query (1-12, default: current month)") +} diff --git a/cmd/s3usage/main.go b/cmd/s3usage/main.go new file mode 100644 index 0000000..736ef31 --- /dev/null +++ b/cmd/s3usage/main.go @@ -0,0 +1,5 @@ +package main + +func main() { + Execute() +} diff --git a/cmd/s3usage/prune.go b/cmd/s3usage/prune.go new file mode 100644 index 0000000..6297f8a --- /dev/null +++ b/cmd/s3usage/prune.go @@ -0,0 +1,70 @@ +package main + +import ( + "fmt" + "os" + + "github.com/spf13/cobra" + "github.com/thannaske/s3usage/pkg/db" +) + +var ( + // Flag to confirm pruning without prompting + confirm bool +) + +var pruneCmd = &cobra.Command{ + Use: "prune", + Short: "Prune old bucket usage data", + Long: `Remove individual bucket usage data points from months that have already +been aggregated into monthly averages. This helps keep the database size manageable +over time while preserving the monthly average statistics. + +Only data from completed months with calculated monthly averages are removed. +Data from the current month and any months without averages are preserved.`, + Run: func(cmd *cobra.Command, args []string) { + // Initialize the database + database, err := db.NewDB(config.DBPath) + if err != nil { + fmt.Printf("Error connecting to database: %v\n", err) + return + } + defer database.Close() + + // If not confirmed, prompt the user + if !confirm { + fmt.Print("This will permanently delete individual data points from months that have " + + "completed and have calculated monthly averages.\n" + + "The monthly average statistics will be preserved.\n" + + "Are you sure you want to continue? (y/N): ") + + var response string + fmt.Scanln(&response) + if response != "y" && response != "Y" { + fmt.Println("Pruning cancelled.") + return + } + } + + // Perform the pruning operation + fmt.Println("Pruning old data points...") + rowsDeleted, err := database.PruneOldData() + if err != nil { + fmt.Printf("Error pruning old data: %v\n", err) + os.Exit(1) + } + + if rowsDeleted == 0 { + fmt.Println("No data to prune. All data points are still needed or no monthly averages have been calculated yet.") + } else { + fmt.Printf("Successfully pruned %d data points from completed months.\n", rowsDeleted) + } + }, +} + +func init() { + rootCmd.AddCommand(pruneCmd) + + // Add flags to the prune command + pruneCmd.Flags().BoolVar(&confirm, "confirm", false, "Confirm pruning without prompting") +} diff --git a/cmd/s3usage/root.go b/cmd/s3usage/root.go new file mode 100644 index 0000000..16cd2dd --- /dev/null +++ b/cmd/s3usage/root.go @@ -0,0 +1,71 @@ +package main + +import ( + "fmt" + "os" + "path/filepath" + + "github.com/spf13/cobra" + "github.com/thannaske/s3usage/pkg/models" +) + +var ( + cfgFile string + config models.Config + defaultDB = filepath.Join(os.Getenv("HOME"), ".s3usage.db") +) + +// rootCmd represents the base command when called without any subcommands +var rootCmd = &cobra.Command{ + Use: "s3usage", + Short: "S3 bucket usage monitor for Ceph", + Long: `A CLI tool to monitor and track usage statistics for S3 buckets in Ceph. +It collects and stores usage data in a SQLite database and provides +commands to query historical usage information.`, +} + +// Execute adds all child commands to the root command and sets flags appropriately. +func Execute() { + if err := rootCmd.Execute(); err != nil { + fmt.Println(err) + os.Exit(1) + } +} + +func init() { + cobra.OnInitialize(initConfig) + + // Global flags + rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is $HOME/.s3usage.yaml)") + rootCmd.PersistentFlags().StringVar(&config.S3Endpoint, "endpoint", "", "S3 endpoint URL") + rootCmd.PersistentFlags().StringVar(&config.S3AccessKey, "access-key", "", "S3 access key") + rootCmd.PersistentFlags().StringVar(&config.S3SecretKey, "secret-key", "", "S3 secret key") + rootCmd.PersistentFlags().StringVar(&config.S3Region, "region", "default", "S3 region") + rootCmd.PersistentFlags().StringVar(&config.DBPath, "db", defaultDB, "SQLite database path") +} + +// initConfig reads in config file if set. +func initConfig() { + // If config file path is provided, use it + if cfgFile != "" { + // TODO: Read config from file + // For now, we'll just use command line flags + } + + // Environment variables can override config + if os.Getenv("S3_ENDPOINT") != "" { + config.S3Endpoint = os.Getenv("S3_ENDPOINT") + } + if os.Getenv("S3_ACCESS_KEY") != "" { + config.S3AccessKey = os.Getenv("S3_ACCESS_KEY") + } + if os.Getenv("S3_SECRET_KEY") != "" { + config.S3SecretKey = os.Getenv("S3_SECRET_KEY") + } + if os.Getenv("S3_REGION") != "" { + config.S3Region = os.Getenv("S3_REGION") + } + if os.Getenv("S3_DB_PATH") != "" { + config.DBPath = os.Getenv("S3_DB_PATH") + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..51b1170 --- /dev/null +++ b/go.mod @@ -0,0 +1,28 @@ +module github.com/thannaske/s3usage + +go 1.23.1 + +require ( + github.com/aws/aws-sdk-go-v2 v1.36.3 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 // indirect + github.com/aws/aws-sdk-go-v2/config v1.29.8 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.61 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.6.2 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15 // indirect + github.com/aws/aws-sdk-go-v2/service/s3 v1.78.0 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.25.0 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.29.0 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.33.16 // indirect + github.com/aws/smithy-go v1.22.2 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/mattn/go-sqlite3 v1.14.24 // indirect + github.com/spf13/cobra v1.9.1 // indirect + github.com/spf13/pflag v1.0.6 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..46c99f2 --- /dev/null +++ b/go.sum @@ -0,0 +1,48 @@ +github.com/aws/aws-sdk-go-v2 v1.36.3 h1:mJoei2CxPutQVxaATCzDUjcZEjVRdpsiiXi2o38yqWM= +github.com/aws/aws-sdk-go-v2 v1.36.3/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 h1:zAybnyUQXIZ5mok5Jqwlf58/TFE7uvd3IAsa1aF9cXs= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10/go.mod h1:qqvMj6gHLR/EXWZw4ZbqlPbQUyenf4h82UQUlKc+l14= +github.com/aws/aws-sdk-go-v2/config v1.29.8 h1:RpwAfYcV2lr/yRc4lWhUM9JRPQqKgKWmou3LV7UfWP4= +github.com/aws/aws-sdk-go-v2/config v1.29.8/go.mod h1:t+G7Fq1OcO8cXTPPXzxQSnj/5Xzdc9jAAD3Xrn9/Mgo= +github.com/aws/aws-sdk-go-v2/credentials v1.17.61 h1:Hd/uX6Wo2iUW1JWII+rmyCD7MMhOe7ALwQXN6sKDd1o= +github.com/aws/aws-sdk-go-v2/credentials v1.17.61/go.mod h1:L7vaLkwHY1qgW0gG1zG0z/X0sQ5tpIY5iI13+j3qI80= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 h1:x793wxmUWVDhshP8WW2mlnXuFrO4cOd3HLBroh1paFw= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30/go.mod h1:Jpne2tDnYiFascUEs2AWHJL9Yp7A5ZVy3TNyxaAjD6M= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 h1:ZK5jHhnrioRkUNOc+hOgQKlUL5JeC3S6JgLxtQ+Rm0Q= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34/go.mod h1:p4VfIceZokChbA9FzMbRGz5OV+lekcVtHlPKEO0gSZY= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 h1:SZwFm17ZUNNg5Np0ioo/gq8Mn6u9w19Mri8DnJ15Jf0= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34/go.mod h1:dFZsC0BLo346mvKQLWmoJxT+Sjp+qcVR1tRVHQGOH9Q= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 h1:bIqFDwgGXXN1Kpp99pDOdKMTTb5d2KyU5X/BZxjOkRo= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3/go.mod h1:H5O/EsxDWyU+LP/V8i5sm8cxoZgc2fdNR9bxlOFrQTo= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34 h1:ZNTqv4nIdE/DiBfUUfXcLZ/Spcuz+RjeziUtNJackkM= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34/go.mod h1:zf7Vcd1ViW7cPqYWEHLHJkS50X0JS2IKz9Cgaj6ugrs= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 h1:eAh2A4b5IzM/lum78bZ590jy36+d/aFLgKF/4Vd1xPE= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3/go.mod h1:0yKJC/kb8sAnmlYa6Zs3QVYqaC8ug2AbnNChv5Ox3uA= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.6.2 h1:t/gZFyrijKuSU0elA5kRngP/oU3mc0I+Dvp8HwRE4c0= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.6.2/go.mod h1:iu6FSzgt+M2/x3Dk8zhycdIcHjEFb36IS8HVUVFoMg0= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 h1:dM9/92u2F1JbDaGooxTq18wmmFzbJRfXfVfy96/1CXM= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15/go.mod h1:SwFBy2vjtA0vZbjjaFtfN045boopadnoVPhu4Fv66vY= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15 h1:moLQUoVq91LiqT1nbvzDukyqAlCv89ZmwaHw/ZFlFZg= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15/go.mod h1:ZH34PJUc8ApjBIfgQCFvkWcUDBtl/WTD+uiYHjd8igA= +github.com/aws/aws-sdk-go-v2/service/s3 v1.78.0 h1:EBm8lXevBWe+kK9VOU/IBeOI189WPRwPUc3LvJK9GOs= +github.com/aws/aws-sdk-go-v2/service/s3 v1.78.0/go.mod h1:4qzsZSzB/KiX2EzDjs9D7A8rI/WGJxZceVJIHqtJjIU= +github.com/aws/aws-sdk-go-v2/service/sso v1.25.0 h1:2U9sF8nKy7UgyEeLiZTRg6ShBS22z8UnYpV6aRFL0is= +github.com/aws/aws-sdk-go-v2/service/sso v1.25.0/go.mod h1:qs4a9T5EMLl/Cajiw2TcbNt2UNo/Hqlyp+GiuG4CFDI= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.29.0 h1:wjAdc85cXdQR5uLx5FwWvGIHm4OPJhTyzUHU8craXtE= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.29.0/go.mod h1:MlYRNmYu/fGPoxBQVvBYr9nyr948aY/WLUvwBMBJubs= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.16 h1:BHEK2Q/7CMRMCb3nySi/w8UbIcPhKvYP5s1xf8/izn0= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.16/go.mod h1:cQnB8CUnxbMU82JvlqjKR2HBOm3fe9pWorWBza6MBJ4= +github.com/aws/smithy-go v1.22.2 h1:6D9hW43xKFrRx/tXXfAlIZc4JI+yQe6snnWcQyxSyLQ= +github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= +github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM= +github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/spf13/cobra v1.9.1 h1:CXSaggrXdbHK9CF+8ywj8Amf7PBRmPCOJugH954Nnlo= +github.com/spf13/cobra v1.9.1/go.mod h1:nDyEzZ8ogv936Cinf6g1RU9MRY64Ir93oCnqb9wxYW0= +github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o= +github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/ceph/ceph.go b/pkg/ceph/ceph.go new file mode 100644 index 0000000..28efe5f --- /dev/null +++ b/pkg/ceph/ceph.go @@ -0,0 +1,254 @@ +package ceph + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strings" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/thannaske/s3usage/pkg/models" +) + +// S3Client represents a client for interacting with Ceph S3 +type S3Client struct { + client *s3.Client + adminClient *http.Client + endpoint string + accessKey string + secretKey string + region string +} + +// NewS3Client creates a new Ceph S3 client +func NewS3Client(cfg models.Config) (*S3Client, error) { + // Create custom resolver to use the Ceph endpoint + customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { + return aws.Endpoint{ + URL: cfg.S3Endpoint, + SigningRegion: cfg.S3Region, + HostnameImmutable: true, + }, nil + }) + + // Create AWS credentials with the provided access and secret keys + creds := credentials.NewStaticCredentialsProvider(cfg.S3AccessKey, cfg.S3SecretKey, "") + + // Load the AWS SDK configuration with custom resolver and credentials + awsCfg, err := config.LoadDefaultConfig( + context.TODO(), + config.WithCredentialsProvider(creds), + config.WithEndpointResolverWithOptions(customResolver), + config.WithRegion(cfg.S3Region), + ) + if err != nil { + return nil, fmt.Errorf("failed to load AWS SDK configuration: %w", err) + } + + // Create S3 client + s3Client := s3.NewFromConfig(awsCfg) + + // Create an HTTP client for admin operations + adminClient := &http.Client{ + Timeout: 30 * time.Second, + } + + return &S3Client{ + client: s3Client, + adminClient: adminClient, + endpoint: cfg.S3Endpoint, + accessKey: cfg.S3AccessKey, + secretKey: cfg.S3SecretKey, + region: cfg.S3Region, + }, nil +} + +// BucketStats represents the statistics of a bucket from Ceph RGW Admin API +type BucketStats struct { + Bucket string `json:"bucket"` + Usage Usage `json:"usage"` + OwnerID string `json:"id"` + OwnerName string `json:"owner"` + Zonegroup string `json:"zonegroup"` + PlacementID string `json:"placement_rule"` + Created string `json:"creation_time"` +} + +// Usage contains usage statistics for a bucket +type Usage struct { + RgwMain struct { + SizeKB int64 `json:"size_kb"` + SizeKBActual int64 `json:"size_kb_actual"` + NumObjects int64 `json:"num_objects"` + } `json:"rgw.main"` +} + +// executeSignedRequest executes an API request with proper AWS v4 signature +func (c *S3Client) executeSignedRequest(ctx context.Context, method, path string, queryParams url.Values, reqBody []byte) ([]byte, error) { + // Parse the endpoint URL + parsedURL, err := url.Parse(c.endpoint) + if err != nil { + return nil, fmt.Errorf("failed to parse endpoint URL: %w", err) + } + + // Set the path + if !strings.HasPrefix(path, "/") { + path = "/" + path + } + parsedURL.Path = path + + // Add query parameters if any + if queryParams != nil { + parsedURL.RawQuery = queryParams.Encode() + } + + // Prepare the request + var bodyReader io.ReadSeeker + if reqBody != nil { + bodyReader = bytes.NewReader(reqBody) + } else { + bodyReader = bytes.NewReader([]byte{}) + } + + req, err := http.NewRequestWithContext(ctx, method, parsedURL.String(), bodyReader) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + // Calculate sha256 hash of the request body + var hashBytes []byte + if reqBody != nil { + h := sha256.New() + h.Write(reqBody) + hashBytes = h.Sum(nil) + } else { + h := sha256.New() + hashBytes = h.Sum(nil) + } + payloadHash := hex.EncodeToString(hashBytes) + + // Create credentials + creds := aws.Credentials{ + AccessKeyID: c.accessKey, + SecretAccessKey: c.secretKey, + } + + // Sign the request - try both service names 'rgw' and 's3' + // Ceph documentation mentions it should be s3, but some deployments may use rgw + signer := v4.NewSigner() + + // Add the payload hash to the request headers + req.Header.Set("X-Amz-Content-Sha256", payloadHash) + + // Try with service name 's3' first (most common) + err = signer.SignHTTP(ctx, creds, req, payloadHash, "s3", c.region, time.Now()) + if err != nil { + return nil, fmt.Errorf("failed to sign request: %w", err) + } + + // Debug output for troubleshooting + fmt.Printf("Executing request: %s %s\n", method, req.URL.String()) + fmt.Printf("Authorization: %s\n", req.Header.Get("Authorization")) + + // Execute the request + resp, err := c.adminClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to execute request: %w", err) + } + defer resp.Body.Close() + + // Read the full response body + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response body: %w", err) + } + + // Check if the response was successful + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(respBody)) + } + + return respBody, nil +} + +// GetBuckets retrieves the list of buckets using the Admin API +func (c *S3Client) GetBuckets(ctx context.Context) ([]string, error) { + // Call the admin API to list buckets + respBody, err := c.executeSignedRequest(ctx, "GET", "/admin/bucket", nil, nil) + if err != nil { + return nil, fmt.Errorf("failed to list buckets with Admin API: %w", err) + } + + // Decode the response - it should be a simple array of strings + var bucketList []string + if err := json.Unmarshal(respBody, &bucketList); err != nil { + return nil, fmt.Errorf("failed to decode response: %w", err) + } + + return bucketList, nil +} + +// GetBucketUsage retrieves the usage statistics for a bucket using the Ceph RGW Admin API +func (c *S3Client) GetBucketUsage(ctx context.Context, bucketName string) (*models.BucketUsage, error) { + // Prepare query parameters + queryParams := url.Values{} + queryParams.Set("bucket", bucketName) + queryParams.Set("stats", "true") + + // Call the admin API to get bucket stats + respBody, err := c.executeSignedRequest(ctx, "GET", "/admin/bucket", queryParams, nil) + if err != nil { + return nil, fmt.Errorf("failed to get bucket stats: %w", err) + } + + // Decode the response + var bucketStats BucketStats + if err := json.Unmarshal(respBody, &bucketStats); err != nil { + return nil, fmt.Errorf("failed to decode response: %w", err) + } + + // Create bucket usage object + usage := &models.BucketUsage{ + BucketName: bucketName, + SizeBytes: bucketStats.Usage.RgwMain.SizeKB * 1024, // Convert KB to bytes + ObjectCount: bucketStats.Usage.RgwMain.NumObjects, + Timestamp: time.Now().UTC(), + } + + return usage, nil +} + +// GetAllBucketsUsage retrieves usage statistics for all buckets +func (c *S3Client) GetAllBucketsUsage(ctx context.Context) ([]models.BucketUsage, error) { + // Get list of buckets + buckets, err := c.GetBuckets(ctx) + if err != nil { + return nil, err + } + + // Get usage stats for each bucket + var usages []models.BucketUsage + for _, bucketName := range buckets { + fmt.Printf("Collecting statistics for bucket: %s\n", bucketName) + usage, err := c.GetBucketUsage(ctx, bucketName) + if err != nil { + // Log error but continue with other buckets + fmt.Printf("Error getting usage for bucket %s: %v\n", bucketName, err) + continue + } + usages = append(usages, *usage) + } + + return usages, nil +} diff --git a/pkg/db/db.go b/pkg/db/db.go new file mode 100644 index 0000000..0fd6216 --- /dev/null +++ b/pkg/db/db.go @@ -0,0 +1,304 @@ +package db + +import ( + "database/sql" + "fmt" + "time" + + _ "github.com/mattn/go-sqlite3" + "github.com/thannaske/s3usage/pkg/models" +) + +// DB represents the database connection +type DB struct { + *sql.DB +} + +// NewDB creates a new database connection +func NewDB(dbPath string) (*DB, error) { + db, err := sql.Open("sqlite3", dbPath) + if err != nil { + return nil, err + } + + if err := db.Ping(); err != nil { + return nil, err + } + + return &DB{db}, nil +} + +// InitDB initializes the database tables +func (db *DB) InitDB() error { + // Create bucket_usage table + _, err := db.Exec(` + CREATE TABLE IF NOT EXISTS bucket_usage ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + bucket_name TEXT NOT NULL, + size_bytes INTEGER NOT NULL, + object_count INTEGER NOT NULL, + timestamp DATETIME NOT NULL + ) + `) + if err != nil { + return err + } + + // Create monthly_averages table + _, err = db.Exec(` + CREATE TABLE IF NOT EXISTS monthly_averages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + bucket_name TEXT NOT NULL, + year INTEGER NOT NULL, + month INTEGER NOT NULL, + avg_size_bytes REAL NOT NULL, + avg_object_count REAL NOT NULL, + data_points INTEGER NOT NULL, + UNIQUE(bucket_name, year, month) + ) + `) + if err != nil { + return err + } + + // Create an index on bucket_name and timestamp for faster queries + _, err = db.Exec(` + CREATE INDEX IF NOT EXISTS idx_bucket_usage_name_time + ON bucket_usage(bucket_name, timestamp) + `) + return err +} + +// StoreBucketUsage stores the bucket usage data in the database +func (db *DB) StoreBucketUsage(usage models.BucketUsage) error { + _, err := db.Exec(` + INSERT INTO bucket_usage (bucket_name, size_bytes, object_count, timestamp) + VALUES (?, ?, ?, ?) + `, usage.BucketName, usage.SizeBytes, usage.ObjectCount, usage.Timestamp) + return err +} + +// GetBucketUsage retrieves the usage data for a specific bucket +func (db *DB) GetBucketUsage(bucketName string, startTime, endTime time.Time) ([]models.BucketUsage, error) { + rows, err := db.Query(` + SELECT id, bucket_name, size_bytes, object_count, timestamp + FROM bucket_usage + WHERE bucket_name = ? AND timestamp BETWEEN ? AND ? + ORDER BY timestamp + `, bucketName, startTime, endTime) + if err != nil { + return nil, err + } + defer rows.Close() + + var usages []models.BucketUsage + for rows.Next() { + var u models.BucketUsage + if err := rows.Scan(&u.ID, &u.BucketName, &u.SizeBytes, &u.ObjectCount, &u.Timestamp); err != nil { + return nil, err + } + usages = append(usages, u) + } + + if err = rows.Err(); err != nil { + return nil, err + } + + return usages, nil +} + +// CalculateMonthlyAverages calculates the monthly averages for all buckets +func (db *DB) CalculateMonthlyAverages(year, month int) error { + startDate := time.Date(year, time.Month(month), 1, 0, 0, 0, 0, time.UTC) + endDate := startDate.AddDate(0, 1, 0).Add(-time.Second) + + // Get all unique bucket names for the given month + bucketRows, err := db.Query(` + SELECT DISTINCT bucket_name + FROM bucket_usage + WHERE timestamp BETWEEN ? AND ? + `, startDate, endDate) + if err != nil { + return err + } + defer bucketRows.Close() + + var buckets []string + for bucketRows.Next() { + var bucket string + if err := bucketRows.Scan(&bucket); err != nil { + return err + } + buckets = append(buckets, bucket) + } + + // For each bucket, calculate the average + for _, bucketName := range buckets { + // Calculate averages + var avgSize float64 + var avgCount float64 + var dataPoints int + err := db.QueryRow(` + SELECT AVG(size_bytes), AVG(object_count), COUNT(*) + FROM bucket_usage + WHERE bucket_name = ? AND timestamp BETWEEN ? AND ? + `, bucketName, startDate, endDate).Scan(&avgSize, &avgCount, &dataPoints) + if err != nil { + return err + } + + // Insert or update the monthly average + _, err = db.Exec(` + INSERT INTO monthly_averages + (bucket_name, year, month, avg_size_bytes, avg_object_count, data_points) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT(bucket_name, year, month) + DO UPDATE SET + avg_size_bytes = excluded.avg_size_bytes, + avg_object_count = excluded.avg_object_count, + data_points = excluded.data_points + `, bucketName, year, month, avgSize, avgCount, dataPoints) + if err != nil { + return err + } + } + + return nil +} + +// GetMonthlyAverage gets the monthly average for a specific bucket +func (db *DB) GetMonthlyAverage(bucketName string, year, month int) (*models.MonthlyBucketAverage, error) { + var avg models.MonthlyBucketAverage + err := db.QueryRow(` + SELECT bucket_name, year, month, avg_size_bytes, avg_object_count, data_points + FROM monthly_averages + WHERE bucket_name = ? AND year = ? AND month = ? + `, bucketName, year, month).Scan( + &avg.BucketName, &avg.Year, &avg.Month, + &avg.AvgSizeBytes, &avg.AvgObjectCount, &avg.DataPoints, + ) + if err == sql.ErrNoRows { + return nil, fmt.Errorf("no data available for bucket %s in %d-%02d", bucketName, year, month) + } + if err != nil { + return nil, err + } + return &avg, nil +} + +// GetAllMonthlyAverages gets all monthly averages for a specific month +func (db *DB) GetAllMonthlyAverages(year, month int) ([]models.MonthlyBucketAverage, error) { + rows, err := db.Query(` + SELECT bucket_name, year, month, avg_size_bytes, avg_object_count, data_points + FROM monthly_averages + WHERE year = ? AND month = ? + ORDER BY bucket_name + `, year, month) + if err != nil { + return nil, err + } + defer rows.Close() + + var averages []models.MonthlyBucketAverage + for rows.Next() { + var avg models.MonthlyBucketAverage + if err := rows.Scan( + &avg.BucketName, &avg.Year, &avg.Month, + &avg.AvgSizeBytes, &avg.AvgObjectCount, &avg.DataPoints, + ); err != nil { + return nil, err + } + averages = append(averages, avg) + } + + if err = rows.Err(); err != nil { + return nil, err + } + + return averages, nil +} + +// PruneOldData removes individual bucket usage data points from months that have +// already been aggregated into monthly averages. +// It keeps data from the current month and any months that don't have averages calculated. +func (db *DB) PruneOldData() (int64, error) { + // Get the current date + now := time.Now() + + // Start of the current month + currentMonthStart := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, time.UTC) + + // Begin a transaction + tx, err := db.Begin() + if err != nil { + return 0, fmt.Errorf("failed to begin transaction: %w", err) + } + defer tx.Rollback() // Rollback if not committed + + // Get a list of months for which we have monthly averages + rows, err := tx.Query(` + SELECT DISTINCT year, month + FROM monthly_averages + ORDER BY year, month + `) + if err != nil { + return 0, fmt.Errorf("failed to query monthly averages: %w", err) + } + defer rows.Close() + + var completedMonths []time.Time + for rows.Next() { + var year, month int + if err := rows.Scan(&year, &month); err != nil { + return 0, fmt.Errorf("failed to scan monthly average row: %w", err) + } + + // Convert to time.Time for easier comparison + monthStart := time.Date(year, time.Month(month), 1, 0, 0, 0, 0, time.UTC) + + // Only include months that are completed (before the current month) + if monthStart.Before(currentMonthStart) { + completedMonths = append(completedMonths, monthStart) + } + } + + if err = rows.Err(); err != nil { + return 0, fmt.Errorf("error iterating monthly average rows: %w", err) + } + + if len(completedMonths) == 0 { + // No pruning needed + return 0, nil + } + + // For each completed month, delete the individual data points + var totalDeleted int64 = 0 + for _, monthStart := range completedMonths { + // End of the month + monthEnd := monthStart.AddDate(0, 1, 0).Add(-time.Second) + + // Delete all individual data points for this month + result, err := tx.Exec(` + DELETE FROM bucket_usage + WHERE timestamp >= ? AND timestamp <= ? + `, monthStart, monthEnd) + if err != nil { + return 0, fmt.Errorf("failed to delete data points for %s: %w", + monthStart.Format("2006-01"), err) + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + return 0, fmt.Errorf("failed to get rows affected: %w", err) + } + + totalDeleted += rowsAffected + } + + // Commit the transaction + if err := tx.Commit(); err != nil { + return 0, fmt.Errorf("failed to commit transaction: %w", err) + } + + return totalDeleted, nil +} diff --git a/pkg/models/models.go b/pkg/models/models.go new file mode 100644 index 0000000..21019da --- /dev/null +++ b/pkg/models/models.go @@ -0,0 +1,33 @@ +package models + +import ( + "time" +) + +// BucketUsage represents the disk usage for a single bucket at a specific point in time +type BucketUsage struct { + ID int64 `json:"id"` + BucketName string `json:"bucket_name"` + SizeBytes int64 `json:"size_bytes"` + ObjectCount int64 `json:"object_count"` + Timestamp time.Time `json:"timestamp"` +} + +// MonthlyBucketAverage represents the average disk usage for a bucket over a month +type MonthlyBucketAverage struct { + BucketName string `json:"bucket_name"` + Year int `json:"year"` + Month int `json:"month"` + AvgSizeBytes float64 `json:"avg_size_bytes"` + AvgObjectCount float64 `json:"avg_object_count"` + DataPoints int `json:"data_points"` +} + +// Config represents the application configuration +type Config struct { + S3Endpoint string `json:"s3_endpoint"` + S3AccessKey string `json:"s3_access_key"` + S3SecretKey string `json:"s3_secret_key"` + S3Region string `json:"s3_region"` + DBPath string `json:"db_path"` +} \ No newline at end of file