# app_final.py - Version dengan bucket management dan I/O fix import os import boto3 from flask import Flask, request, render_template, flash, redirect, url_for, jsonify from werkzeug.utils import secure_filename from botocore.exceptions import ClientError from botocore.config import Config from dotenv import load_dotenv import uuid from datetime import datetime import urllib3 import io # Disable SSL warnings untuk development urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Load environment variables load_dotenv() app = Flask(__name__) app.secret_key = 'your-secret-key-here' # Konfigurasi NEO Object Storage NEO_CONFIG = { 'access_key': os.getenv('NEO_ACCESS_KEY'), 'secret_key': os.getenv('NEO_SECRET_KEY'), 'endpoint': os.getenv('NEO_ENDPOINT'), 'bucket': os.getenv('NEO_BUCKET'), 'use_ssl': os.getenv('NEO_USE_SSL', 'false').lower() == 'true' } # Inisialisasi S3 client untuk NEO def get_s3_client(): if not NEO_CONFIG['use_ssl']: endpoint = NEO_CONFIG['endpoint'].replace('https://', 'http://') use_ssl = False else: endpoint = NEO_CONFIG['endpoint'] use_ssl = True return boto3.client( 's3', endpoint_url=endpoint, aws_access_key_id=NEO_CONFIG['access_key'], aws_secret_access_key=NEO_CONFIG['secret_key'], region_name='wjv-1', verify=False if use_ssl else None, # Disable SSL verification jika HTTPS use_ssl=use_ssl, config=Config( signature_version='s3v4', retries={'max_attempts': 3}, region_name='wjv-1' ) ) # Fungsi untuk membuat bucket jika belum ada def create_bucket_if_not_exists(): try: s3_client = get_s3_client() # Cek apakah bucket sudah ada try: s3_client.head_bucket(Bucket=NEO_CONFIG['bucket']) return {'success': True, 'message': f'Bucket {NEO_CONFIG["bucket"]} sudah ada'} except ClientError as e: error_code = int(e.response['Error']['Code']) if error_code == 404: # Bucket tidak ada, buat baru try: s3_client.create_bucket( Bucket=NEO_CONFIG['bucket'], CreateBucketConfiguration={'LocationConstraint': 'wjv-1'} ) return {'success': True, 'message': f'Bucket {NEO_CONFIG["bucket"]} berhasil dibuat'} except ClientError as create_error: # Jika gagal buat bucket, coba tanpa LocationConstraint try: s3_client.create_bucket(Bucket=NEO_CONFIG['bucket']) return {'success': True, 'message': f'Bucket {NEO_CONFIG["bucket"]} berhasil dibuat (fallback)'} except Exception as final_error: return {'success': False, 'error': f'Gagal membuat bucket: {str(final_error)}'} else: return {'success': False, 'error': f'Error checking bucket: {str(e)}'} except Exception as e: return {'success': False, 'error': f'Error: {str(e)}'} # Fungsi upload dengan fix I/O dan bucket handling def upload_to_neo(file, folder="uploads"): try: # Baca file ke memory buffer untuk avoid I/O closed error file.seek(0) # Reset pointer ke awal file_content = file.read() file_buffer = io.BytesIO(file_content) # Reset file pointer untuk compatibility file.seek(0) s3_client = get_s3_client() # Generate nama file unik timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = secure_filename(file.filename) unique_filename = f"{timestamp}_{uuid.uuid4().hex[:8]}_{filename}" # Path di object storage object_key = f"{folder}/{unique_filename}" # Cek/buat bucket dulu bucket_result = create_bucket_if_not_exists() if not bucket_result['success']: return { 'success': False, 'error': f'Bucket error: {bucket_result["error"]}' } # Upload file menggunakan buffer try: s3_client.upload_fileobj( file_buffer, NEO_CONFIG['bucket'], object_key, ExtraArgs={ 'ACL': 'public-read', 'ContentType': file.content_type or 'application/octet-stream' } ) except ClientError as e: error_code = e.response['Error']['Code'] if error_code == 'NoSuchBucket': return { 'success': False, 'error': f'Bucket "{NEO_CONFIG["bucket"]}" tidak ditemukan. Coba buat bucket manual di portal NEO.' } elif error_code == 'AccessDenied': return { 'success': False, 'error': 'Access denied. Periksa Access Key dan Secret Key, atau permissions bucket.' } else: return { 'success': False, 'error': f'AWS Error ({error_code}): {e.response["Error"]["Message"]}' } # Generate URL public if NEO_CONFIG['use_ssl']: file_url = f"{NEO_CONFIG['endpoint']}/{NEO_CONFIG['bucket']}/{object_key}" else: http_endpoint = NEO_CONFIG['endpoint'].replace('https://', 'http://') file_url = f"{http_endpoint}/{NEO_CONFIG['bucket']}/{object_key}" return { 'success': True, 'url': file_url, 'filename': unique_filename, 'object_key': object_key, 'bucket_status': bucket_result['message'] } except Exception as e: error_msg = str(e) if 'SSL' in error_msg or 'certificate' in error_msg: return { 'success': False, 'error': f'SSL Error: {error_msg}. Set NEO_USE_SSL=false di .env file.' } else: return { 'success': False, 'error': f'Upload error: {error_msg}' } # Fungsi untuk list buckets (debugging) def list_buckets(): try: s3_client = get_s3_client() response = s3_client.list_buckets() buckets = [bucket['Name'] for bucket in response['Buckets']] return {'success': True, 'buckets': buckets} except Exception as e: return {'success': False, 'error': str(e)} # Fungsi untuk hapus file dari NEO def delete_from_neo(object_key): try: s3_client = get_s3_client() s3_client.delete_object( Bucket=NEO_CONFIG['bucket'], Key=object_key ) return {'success': True} except Exception as e: return {'success': False, 'error': str(e)} # Fungsi untuk list semua file def list_files_from_neo(folder="uploads"): try: s3_client = get_s3_client() # Cek bucket dulu bucket_result = create_bucket_if_not_exists() if not bucket_result['success']: return {'success': False, 'error': f'Bucket error: {bucket_result["error"]}'} response = s3_client.list_objects_v2( Bucket=NEO_CONFIG['bucket'], Prefix=folder + "/" if folder else "" ) files = [] if 'Contents' in response: for obj in response['Contents']: # Generate URL yang konsisten if NEO_CONFIG['use_ssl']: file_url = f"{NEO_CONFIG['endpoint']}/{NEO_CONFIG['bucket']}/{obj['Key']}" else: http_endpoint = NEO_CONFIG['endpoint'].replace('https://', 'http://') file_url = f"{http_endpoint}/{NEO_CONFIG['bucket']}/{obj['Key']}" files.append({ 'key': obj['Key'], 'filename': obj['Key'].split('/')[-1], 'size': obj['Size'], 'modified': obj['LastModified'].strftime("%Y-%m-%d %H:%M:%S"), 'url': file_url }) return {'success': True, 'files': files} except Exception as e: return {'success': False, 'error': str(e)} # Routes @app.route('/') def index(): return render_template('index.html') @app.route('/upload', methods=['GET', 'POST']) def upload_file(): if request.method == 'POST': if 'file' not in request.files: flash('Tidak ada file yang dipilih!', 'error') return redirect(request.url) file = request.files['file'] if file.filename == '': flash('Tidak ada file yang dipilih!', 'error') return redirect(request.url) if file: result = upload_to_neo(file, folder="uploads") if result['success']: flash(f'File berhasil diupload! URL: {result["url"]}', 'success') return redirect(url_for('view_files')) else: flash(f'Error upload: {result["error"]}', 'error') return render_template('upload.html') @app.route('/files') def view_files(): result = list_files_from_neo() if result['success']: return render_template('files.html', files=result['files']) else: flash(f'Error loading files: {result["error"]}', 'error') return render_template('files.html', files=[]) @app.route('/api/upload', methods=['POST']) def api_upload(): if 'file' not in request.files: return jsonify({'success': False, 'error': 'No file provided'}) file = request.files['file'] folder = request.form.get('folder', 'uploads') if file.filename == '': return jsonify({'success': False, 'error': 'No file selected'}) result = upload_to_neo(file, folder) return jsonify(result) @app.route('/api/delete', methods=['POST']) def api_delete(): data = request.get_json() object_key = data.get('object_key') if not object_key: return jsonify({'success': False, 'error': 'Object key required'}) result = delete_from_neo(object_key) return jsonify(result) @app.route('/api/files') def api_files(): folder = request.args.get('folder', 'uploads') result = list_files_from_neo(folder) return jsonify(result) # Route untuk test koneksi dan setup @app.route('/test-connection') def test_connection(): results = { 'bucket_check': create_bucket_if_not_exists(), 'list_buckets': list_buckets(), 'config': { 'endpoint': NEO_CONFIG['endpoint'], 'bucket': NEO_CONFIG['bucket'], 'use_ssl': NEO_CONFIG['use_ssl'], 'access_key': NEO_CONFIG['access_key'][:10] + '...' if NEO_CONFIG['access_key'] else 'Not set' } } return jsonify(results) # Route untuk setup bucket @app.route('/setup-bucket') def setup_bucket(): result = create_bucket_if_not_exists() return jsonify(result) # Route untuk list semua buckets @app.route('/list-buckets') def api_list_buckets(): result = list_buckets() return jsonify(result) if __name__ == '__main__': # Check konfigurasi required_configs = ['NEO_ACCESS_KEY', 'NEO_SECRET_KEY', 'NEO_ENDPOINT', 'NEO_BUCKET'] missing_configs = [config for config in required_configs if not os.getenv(config)] if missing_configs: print(f"❌ Error: Missing configurations: {', '.join(missing_configs)}") print("Please check your .env file") exit(1) print("🔧 Setting up NEO Object Storage...") # Test bucket setup bucket_result = create_bucket_if_not_exists() if bucket_result['success']: print(f"✅ {bucket_result['message']}") else: print(f"⚠️ {bucket_result['error']}") print("💡 Tip: Buat bucket manual di portal NEO atau cek credentials") # Test list buckets untuk debugging buckets_result = list_buckets() if buckets_result['success']: print(f"📁 Available buckets: {buckets_result['buckets']}") else: print(f"❌ Cannot list buckets: {buckets_result['error']}") app.run(debug=True, host='0.0.0.0', port=7600)