Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions src/gxs/rsgenexchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,16 @@ class RsGenExchange : public RsNxsObserver, public RsTickingThread, public RsGxs
* GXS_SERV::GXS_MSG_STATUS_UNPROCESSED and GXS_SERV::GXS_MSG_STATUS_UNREAD
* @param changes the changes that have occured to data held by this service
*/
/*!
* processes msg local meta changes
*/
void processMsgMetaChanges();

/*!
* Processes group local meta changes
*/
void processGrpMetaChanges();

virtual void notifyChanges(std::vector<RsGxsNotify*>& changes) = 0;

private:
Expand All @@ -805,15 +815,7 @@ class RsGenExchange : public RsNxsObserver, public RsTickingThread, public RsGxs

bool checkGroupMetaConsistency(const RsGroupMetaData& meta);

/*!
* processes msg local meta changes
*/
void processMsgMetaChanges();

/*!
* Processes group local meta changes
*/
void processGrpMetaChanges();
// Moved to protected

/*!
* Convenience function for properly applying masks for status and subscribe flag
Expand Down
20 changes: 11 additions & 9 deletions src/gxs/rsgxsdataaccess.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1195,6 +1195,14 @@ bool RsGxsDataAccess::getMsgMetaDataList( const GxsMsgReq& msgIds, const RsTokRe
metaV[i] = nullptr;
continue;
}

// Apply mStatusMask/mStatusFilter if specified (fixes bug where all msgs were returned
// even when filtering for UNPROCESSED status in request_GroupUnprocessedPosts)
if (!checkMsgFilter(opts, msgMeta))
{
metaV[i] = nullptr;
continue;
}
}
}

Expand Down Expand Up @@ -1229,6 +1237,9 @@ bool RsGxsDataAccess::getMsgIdList( const GxsMsgReq& msgIds, const RsTokReqOptio

for(auto it(result.begin());it!=result.end();++it)
{
if (it->second.empty())
continue;

auto& id_set(msgIdsOut[it->first]);

for(uint32_t i=0;i<it->second.size();++i)
Expand Down Expand Up @@ -1890,15 +1901,6 @@ bool RsGxsDataAccess::checkMsgFilter(const RsTokReqOptions& opts, const std::sha
}
else
{
#ifdef DATA_DEBUG
GXSDATADEBUG << __PRETTY_FUNCTION__
<< " Dropping Msg due to !StatusMatch "
<< " Mask: " << opts.mStatusMask
<< " StatusFilter: " << opts.mStatusFilter
<< " MsgStatus: " << meta->mMsgStatus
<< " MsgId: " << meta->mMsgId << std::endl;
#endif

return false;
}
}
Expand Down
40 changes: 32 additions & 8 deletions src/services/p3gxschannels.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1075,7 +1075,10 @@ void p3GxsChannels::load_unprocessedPosts(uint32_t token)
#endif

std::vector<RsGxsChannelPost> posts;
if (!getPostData(token, posts))
std::vector<RsGxsComment> comments;
std::vector<RsGxsVote> votes;

if (!getPostData(token, posts, comments, votes))
{
std::cerr << __PRETTY_FUNCTION__ << " ERROR getting post data!"
<< std::endl;
Expand All @@ -1088,6 +1091,23 @@ void p3GxsChannels::load_unprocessedPosts(uint32_t token)
/* autodownload the files */
handleUnprocessedPost(*it);
}

for(const auto& comment: comments)
{
uint32_t token;
RsGxsGrpMsgIdPair msgId(comment.mMeta.mGroupId, comment.mMeta.mMsgId);
setMessageProcessedStatus(token, msgId, true);
}

for(const auto& vote: votes)
{
uint32_t token;
RsGxsGrpMsgIdPair msgId(vote.mMeta.mGroupId, vote.mMeta.mMsgId);
setMessageProcessedStatus(token, msgId, true);
}

/* Force processing of meta changes immediately to ensure persistence */
processMsgMetaChanges();
}

void p3GxsChannels::handleUnprocessedPost(const RsGxsChannelPost &msg)
Expand All @@ -1099,8 +1119,11 @@ void p3GxsChannels::handleUnprocessedPost(const RsGxsChannelPost &msg)

if (!IS_MSG_UNPROCESSED(msg.mMeta.mMsgStatus))
{
std::cerr << __PRETTY_FUNCTION__ << " ERROR Msg already Processed! "
<< "mMsgId: " << msg.mMeta.mMsgId << std::endl;
/* Even though the serialized item says processed, the DB metadata may still say UNPROCESSED
* (they can be out of sync). Force marking as processed to fix the DB metadata. */
uint32_t token;
RsGxsGrpMsgIdPair msgId(msg.mMeta.mGroupId, msg.mMeta.mMsgId);
setMessageProcessedStatus(token, msgId, true);
return;
}

Expand Down Expand Up @@ -1153,11 +1176,6 @@ void p3GxsChannels::handleUnprocessedPost(const RsGxsChannelPost &msg)
<< std::endl;
}
}

/* mark as processed */
uint32_t token;
RsGxsGrpMsgIdPair msgId(msg.mMeta.mGroupId, msg.mMeta.mMsgId);
setMessageProcessedStatus(token, msgId, true);
}
#ifdef GXSCHANNELS_DEBUG
else
Expand All @@ -1166,6 +1184,12 @@ void p3GxsChannels::handleUnprocessedPost(const RsGxsChannelPost &msg)
std::cerr << std::endl;
}
#endif

/* Always mark as processed, regardless of autodownload setting.
* This prevents the message from being loaded again at every startup. */
uint32_t token;
RsGxsGrpMsgIdPair msgId(msg.mMeta.mGroupId, msg.mMeta.mMsgId);
setMessageProcessedStatus(token, msgId, true);
}


Expand Down
Loading