From 63761298276d70e4419103a3dde98817305d158f Mon Sep 17 00:00:00 2001 From: Shane Date: Thu, 1 May 2025 17:31:27 +1000 Subject: [PATCH] add back in DDG search for flickr --- foodie_utils.py | 278 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 258 insertions(+), 20 deletions(-) diff --git a/foodie_utils.py b/foodie_utils.py index 035c914..ce3f40e 100644 --- a/foodie_utils.py +++ b/foodie_utils.py @@ -225,7 +225,6 @@ def get_image(search_query): flickr_request_count += 1 logging.info(f"Flickr request count: {flickr_request_count}/3600") - # Enforce a minimum delay of 1 second between Flickr requests current_time = time.time() time_since_last_request = current_time - last_flickr_request_time if time_since_last_request < 1: @@ -235,7 +234,6 @@ def get_image(search_query): headers = {'User-Agent': 'InsiderFoodieBot/1.0 (https://insiderfoodie.com; contact@insiderfoodie.com)'} - # Helper function to search Flickr with a given query def search_flickr(query, per_page=20): try: photos = flickr_api.Photo.search( @@ -251,7 +249,14 @@ def get_image(search_query): logging.warning(f"Flickr API error for query '{query}': {e}") return [] - # Helper function to process a photo + def fetch_photo_by_id(photo_id): + try: + photo = flickr_api.Photo(id=photo_id) + return photo + except Exception as e: + logging.warning(f"Failed to fetch Flickr photo ID {photo_id}: {e}") + return None + def process_photo(photo): tags = [tag.text.lower() for tag in photo.getTags()] title = photo.title.lower() if photo.title else "" @@ -320,7 +325,28 @@ def get_image(search_query): if temp_file and os.path.exists(temp_path): os.unlink(temp_path) - # Helper function to classify keywords as specific or generic + def search_ddg_for_flickr(query): + ddg_query = f"{query} site:flickr.com" + ddg_url = f"https://duckduckgo.com/?q={quote(ddg_query)}" + try: + response = requests.get(ddg_url, headers=headers, timeout=10) + response.raise_for_status() + soup = BeautifulSoup(response.text, 'html.parser') + + photo_ids = set() + for link in soup.find_all('a', href=True): + href = link['href'] + match = re.search(r'flickr\.com/photos/[^/]+/(\d+)', href) + if match: + photo_id = match.group(1) + photo_ids.add(photo_id) + + logging.info(f"Found {len(photo_ids)} Flickr photo IDs via DDG: {photo_ids}") + return photo_ids + except Exception as e: + logging.warning(f"DDG search failed for query '{ddg_query}': {e}") + return set() + def classify_keywords(keywords): prompt = ( "Given the following keywords from an image search query, classify each as 'specific' (e.g., brand names, unique entities) or 'generic' (e.g., common or abstract terms). " @@ -330,15 +356,14 @@ def get_image(search_query): "```json\n" "{\n" " \"Wingstop\": \"specific\",\n" - " \"Smart\": \"generic\",\n" - " \"Kitchen\": \"generic\"\n" + " \"dining\": \"generic\"\n" "}\n```" ) try: response = client.chat.completions.create( model=LIGHT_TASK_MODEL, messages=[ - {"role": "system", "content": "You are a helpful assistant that classifies keywords."}, + {"role": "system", "content": "You are a helper that classifies keywords."}, {"role": "user", "content": prompt} ], max_tokens=100, @@ -356,21 +381,23 @@ def get_image(search_query): logging.warning(f"Keyword classification failed: {e}. Defaulting to all specific.") return {kw: "specific" for kw in keywords} - # Step 1: Try the original search query on Flickr - logging.info(f"Searching Flickr with original query: '{search_query}'") - photos = search_flickr(search_query) - for photo in photos: - result = process_photo(photo) - if result: - return result + # Step 1: Search DDG to find Flickr photo IDs + logging.info(f"Searching DDG with query: '{search_query} site:flickr.com'") + photo_ids = search_ddg_for_flickr(search_query) + if photo_ids: + for photo_id in photo_ids: + photo = fetch_photo_by_id(photo_id) + if photo: + result = process_photo(photo) + if result: + return result - # Step 2: Break down the query into keywords and classify them + # Step 2: Break down the query into keywords and classify them for direct Flickr API search keywords = search_query.lower().split() if len(keywords) > 1: classifications = classify_keywords(keywords) logging.info(f"Keyword classifications: {classifications}") - # Prioritize specific keywords specific_keywords = [kw for kw, classification in classifications.items() if classification == "specific"] if specific_keywords: for keyword in specific_keywords: @@ -382,10 +409,8 @@ def get_image(search_query): return result # Step 3: Final fallback to a generic food-related query - # Use a simple generic query derived from context (e.g., "food dining") - fallback_query = "food dining" # This could be further contextualized if needed - logging.info(f"No results found. Falling back to generic query: '{fallback_query}'") - photos = search_flickr(fallback_query) + logging.info(f"No results found. Falling back to generic query: 'food dining'") + photos = search_flickr("food dining") for photo in photos: result = process_photo(photo) if result: @@ -999,6 +1024,219 @@ def reset_flickr_request_count(): if time.time() - flickr_request_start_time >= 3600: # Reset every hour flickr_request_count = 0 flickr_request_start_time = time.time() + +def get_flickr_image(search_query, relevance_keywords): + global last_flickr_request_time, flickr_request_count + + reset_flickr_request_count() + flickr_request_count += 1 + logging.info(f"Flickr request count: {flickr_request_count}/3600") + + # Enforce a minimum delay of 1 second between Flickr requests + current_time = time.time() + time_since_last_request = current_time - last_flickr_request_time + if time_since_last_request < 1: + time.sleep(1 - time_since_last_request) + + last_flickr_request_time = time.time() + + headers = {'User-Agent': 'InsiderFoodieBot/1.0 (https://insiderfoodie.com; contact@insiderfoodie.com)'} + + # Helper function to search Flickr with a given query + def search_flickr(query, per_page=20): + try: + photos = flickr_api.Photo.search( + text=query, + per_page=per_page, + sort='relevance', + safe_search=1, + media='photos', + license='4,5,9,10' + ) + return photos + except Exception as e: + logging.warning(f"Flickr API error for query '{query}': {e}") + return [] + + # Helper function to fetch a Flickr photo by ID + def fetch_photo_by_id(photo_id): + try: + photo = flickr_api.Photo(id=photo_id) + return photo + except Exception as e: + logging.warning(f"Failed to fetch Flickr photo ID {photo_id}: {e}") + return None + + # Helper function to process a photo + def process_photo(photo): + tags = [tag.text.lower() for tag in photo.getTags()] + title = photo.title.lower() if photo.title else "" + + matched_keywords = [kw for kw in exclude_keywords if kw in tags or kw in title] + if matched_keywords: + logging.info(f"Skipping image with unwanted keywords: {photo.id} (tags: {tags}, title: {title}, matched: {matched_keywords})") + return None + + img_url = photo.getPhotoFile(size_label='Large') + if not img_url: + img_url = photo.getPhotoFile(size_label='Medium') + if not img_url or img_url in used_images: + return None + + temp_file = None + try: + img_response = requests.get(img_url, headers=headers, timeout=10) + img_response.raise_for_status() + with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as temp_file: + temp_file.write(img_response.content) + temp_path = temp_file.name + + img = Image.open(temp_path) + text = pytesseract.image_to_string(img) + char_count = len(text.strip()) + logging.info(f"OCR processed {img_url}: {char_count} characters detected") + + if char_count > 200: + logging.info(f"Skipping text-heavy image (OCR): {img_url} (char_count: {char_count})") + return None + + uploader = photo.owner.username + page_url = f"https://www.flickr.com/photos/{photo.owner.nsid}/{photo.id}" + + used_images.add(img_url) + save_used_images() + + flickr_data = { + "title": search_query, + "image_url": img_url, + "source": "Flickr", + "uploader": uploader, + "page_url": page_url, + "timestamp": datetime.now(timezone.utc).isoformat(), + "ocr_chars": char_count + } + flickr_file = "/home/shane/foodie_automator/flickr_images.json" + with open(flickr_file, 'a') as f: + json.dump(flickr_data, f) + f.write('\n') + logging.info(f"Saved Flickr image to {flickr_file}: {img_url}") + + logging.info(f"Fetched Flickr image: {img_url} by {uploader} for query '{search_query}' (tags: {tags})") + return img_url, "Flickr", uploader, page_url + + except requests.exceptions.HTTPError as e: + if e.response.status_code == 429: + logging.warning(f"Rate limit hit for {img_url}. Falling back to Pixabay.") + return None + else: + logging.warning(f"Download failed for {img_url}: {e}") + return None + except Exception as e: + logging.warning(f"OCR processing failed for {img_url}: {e}") + return None + finally: + if temp_file and os.path.exists(temp_path): + os.unlink(temp_path) + + # Helper function to search DDG and extract Flickr photo IDs + def search_ddg_for_flickr(query): + ddg_query = f"{query} site:flickr.com" + ddg_url = f"https://duckduckgo.com/?q={quote(ddg_query)}" + try: + response = requests.get(ddg_url, headers=headers, timeout=10) + response.raise_for_status() + soup = BeautifulSoup(response.text, 'html.parser') + + photo_ids = set() + # Look for Flickr URLs in the search results + for link in soup.find_all('a', href=True): + href = link['href'] + # Match Flickr photo URLs like https://www.flickr.com/photos/username/1234567890 + match = re.search(r'flickr\.com/photos/[^/]+/(\d+)', href) + if match: + photo_id = match.group(1) + photo_ids.add(photo_id) + + logging.info(f"Found {len(photo_ids)} Flickr photo IDs via DDG: {photo_ids}") + return photo_ids + except Exception as e: + logging.warning(f"DDG search failed for query '{ddg_query}': {e}") + return set() + + # Helper function to classify keywords as specific or generic + def classify_keywords(keywords): + prompt = ( + "Given the following keywords from an image search query, classify each as 'specific' (e.g., brand names, unique entities) or 'generic' (e.g., common or abstract terms). " + "Return a JSON object mapping each keyword to its classification.\n\n" + "Keywords: " + ", ".join(keywords) + "\n\n" + "Example output:\n" + "```json\n" + "{\n" + " \"Wingstop\": \"specific\",\n" + " \"dining\": \"generic\"\n" + "}\n```" + ) + try: + response = client.chat.completions.create( + model=LIGHT_TASK_MODEL, + messages=[ + {"role": "system", "content": "You are a helper that classifies keywords."}, + {"role": "user", "content": prompt} + ], + max_tokens=100, + temperature=0.5 + ) + raw_response = response.choices[0].message.content + json_match = re.search(r'```json\n([\s\S]*?)\n```', raw_response) + if not json_match: + logging.warning(f"Failed to parse keyword classification JSON: {raw_response}") + return {kw: "specific" for kw in keywords} + + classifications = json.loads(json_match.group(1)) + return classifications + except Exception as e: + logging.warning(f"Keyword classification failed: {e}. Defaulting to all specific.") + return {kw: "specific" for kw in keywords} + + # Step 1: Search DDG to find Flickr photo IDs + logging.info(f"Searching DDG with query: '{search_query} site:flickr.com'") + photo_ids = search_ddg_for_flickr(search_query) + if photo_ids: + for photo_id in photo_ids: + photo = fetch_photo_by_id(photo_id) + if photo: + result = process_photo(photo) + if result: + return result + + # Step 2: Break down the query into keywords and classify them for direct Flickr API search + keywords = search_query.lower().split() + if len(keywords) > 1: + classifications = classify_keywords(keywords) + logging.info(f"Keyword classifications: {classifications}") + + # Prioritize specific keywords + specific_keywords = [kw for kw, classification in classifications.items() if classification == "specific"] + if specific_keywords: + for keyword in specific_keywords: + logging.info(f"Searching Flickr with specific keyword: '{keyword}'") + photos = search_flickr(keyword) + for photo in photos: + result = process_photo(photo) + if result: + return result + + # Step 3: Final fallback using relevance keywords + fallback_query = " ".join(relevance_keywords) if isinstance(relevance_keywords, list) else relevance_keywords + logging.info(f"No results found. Falling back to generic query: '{fallback_query}'") + photos = search_flickr(fallback_query) + for photo in photos: + result = process_photo(photo) + if result: + return result + + logging.warning(f"No valid Flickr image found for query '{search_query}' after all attempts.") + return None, None, None, None def select_best_author(summary): try: